package com.atguigu.app.dwd.db;

import com.atguigu.utils.MyKafkaUtil;
import com.atguigu.utils.MysqlUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

public class DwdTradeOrderPreProcess1 {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  //生产环境设置为Kafka的分区数
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //设置为网络最大延迟
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));

        //1.1 设置状态后端
        //System.setProperty("HADOOP_USER_NAME", "atguigu");
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall-flink/ck");

        //1.2 开启CK
        //env.enableCheckpointing(3 * 60000L);
        //env.getCheckpointConfig().setCheckpointTimeout(5 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //TODO 2.读取Kafka ODS层 topic_db 主题创建主表
        tableEnv.executeSql(MyKafkaUtil.getTopicDbDDL("order_pre_process1_211227"));

        //TODO 3.过滤出订单表
        Table orderInfoTable = tableEnv.sqlQuery("" +
                "select " +
                "    `data`['id'] id, " +
                "    `data`['consignee'] consignee, " +
                "    `data`['consignee_tel'] consignee_tel, " +
                "    `data`['total_amount'] total_amount, " +
                "    `data`['order_status'] order_status, " +
                "    `data`['user_id'] user_id, " +
                "    `data`['payment_way'] payment_way, " +
                "    `data`['delivery_address'] delivery_address, " +
                "    `data`['order_comment'] order_comment, " +
                "    `data`['out_trade_no'] out_trade_no, " +
                "    `data`['trade_body'] trade_body, " +
                "    `data`['create_time'] create_time, " +
                "    `data`['operate_time'] operate_time, " +
                "    `data`['expire_time'] expire_time, " +
                "    `data`['process_status'] process_status, " +
                "    `data`['tracking_no'] tracking_no, " +
                "    `data`['parent_order_id'] parent_order_id, " +
                "    `data`['province_id'] province_id, " +
                "    `data`['activity_reduce_amount'] activity_reduce_amount, " +
                "    `data`['coupon_reduce_amount'] coupon_reduce_amount, " +
                "    `data`['original_total_amount'] original_total_amount, " +
                "    `data`['feight_fee'] feight_fee, " +
                "    `data`['feight_fee_reduce'] feight_fee_reduce, " +
                "    `data`['refundable_time'] refundable_time, " +
                "    `type`, " +
                "    `old`, " +
                "    pt " +
                "from topic_db " +
                "where `database` = 'gmall-211227-flink' " +
                "and `table` = 'order_info' " +
                "and (`type` = 'insert' or `type` = 'update')");
        tableEnv.createTemporaryView("order_info", orderInfoTable);
        //tableEnv.toAppendStream(orderInfoTable, Row.class).print(">>>>>>>>");

        //TODO 4.过滤出订单明细表
        Table orderDetailTable = tableEnv.sqlQuery("" +
                "select " +
                "    `data`['id'] id, " +
                "    `data`['order_id'] order_id, " +
                "    `data`['sku_id'] sku_id, " +
                "    `data`['sku_name'] sku_name, " +
                "    `data`['order_price'] order_price, " +
                "    `data`['sku_num'] sku_num, " +
                "    `data`['create_time'] create_time, " +
                "    `data`['source_type'] source_type, " +
                "    `data`['source_id'] source_id, " +
                "    `data`['split_total_amount'] split_total_amount, " +
                "    `data`['split_activity_amount'] split_activity_amount, " +
                "    `data`['split_coupon_amount'] split_coupon_amount " +
                "from topic_db " +
                "where `database` = 'gmall-211227-flink'  " +
                "and `table` = 'order_detail' " +
                "and `type` = 'insert'");
        tableEnv.createTemporaryView("order_detail", orderDetailTable);
        //tableEnv.toAppendStream(orderDetailTable, Row.class).print(">>>>>>>>>>>>");

        //TODO 5.过滤出订单明细活动表
        Table orderActivityTable = tableEnv.sqlQuery("" +
                "select " +
                "    `data`['id'] id, " +
                "    `data`['order_id'] order_id, " +
                "    `data`['order_detail_id'] order_detail_id, " +
                "    `data`['activity_id'] activity_id, " +
                "    `data`['activity_rule_id'] activity_rule_id, " +
                "    `data`['sku_id'] sku_id, " +
                "    `data`['create_time'] create_time " +
                "from topic_db " +
                "where `database` = 'gmall-211227-flink'  " +
                "and `table` = 'order_detail_activity' " +
                "and `type` = 'insert'");
        tableEnv.createTemporaryView("order_activity", orderActivityTable);
        //tableEnv.toAppendStream(orderActivityTable, Row.class).print("order_activity>>>>>>>>>>>");

        //TODO 6.过滤出订单明细购物券表
        Table orderCouponTable = tableEnv.sqlQuery("" +
                "select " +
                "    `data`['id'] id, " +
                "    `data`['order_id'] order_id, " +
                "    `data`['order_detail_id'] order_detail_id, " +
                "    `data`['coupon_id'] coupon_id, " +
                "    `data`['coupon_use_id'] coupon_use_id, " +
                "    `data`['sku_id'] sku_id, " +
                "    `data`['create_time'] create_time " +
                "from topic_db " +
                "where `database` = 'gmall-211227-flink'  " +
                "and `table` = 'order_detail_coupon' " +
                "and `type` = 'insert'");
        tableEnv.createTemporaryView("order_coupon", orderCouponTable);
        //tableEnv.toAppendStream(orderCouponTable, Row.class).print("order_coupon>>>>>>>>>>>>");

        //TODO 7.构建MySQL中 base_dic LookUp表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

        //TODO 8.关联5张表
        Table resultTable = tableEnv.sqlQuery("" +
                "select " +
                "    oi.consignee, " +
                "    oi.consignee_tel, " +
                "    oi.total_amount, " +
                "    oi.order_status, " +
                "    oi.user_id, " +
                "    oi.payment_way, " +
                "    oi.delivery_address, " +
                "    oi.order_comment, " +
                "    oi.out_trade_no, " +
                "    oi.trade_body, " +
                "    oi.create_time, " +
                "    oi.operate_time, " +
                "    oi.expire_time, " +
                "    oi.process_status, " +
                "    oi.tracking_no, " +
                "    oi.parent_order_id, " +
                "    oi.province_id, " +
                "    oi.activity_reduce_amount, " +
                "    oi.coupon_reduce_amount, " +
                "    oi.original_total_amount, " +
                "    oi.feight_fee, " +
                "    oi.feight_fee_reduce, " +
                "    oi.refundable_time, " +
                "    od.id order_detail_id, " +
                "    od.order_id, " +
                "    od.sku_id, " +
                "    od.sku_name, " +
                "    od.order_price, " +
                "    od.sku_num, " +
                "    od.source_type, " +
                "    od.source_id, " +
                "    od.split_total_amount, " +
                "    od.split_activity_amount, " +
                "    od.split_coupon_amount, " +
                "    oa.id order_detail_activity_id, " +
                "    oa.activity_id, " +
                "    oa.activity_rule_id, " +
                "    oc.id order_detail_coupon_id, " +
                "    oc.coupon_id, " +
                "    oc.coupon_use_id, " +
                "    dic.dic_name " +
                "from order_info oi " +
                "join order_detail od " +
                "on oi.id = od.order_id " +
                "left join order_activity oa " +
                "on od.id = oa.order_detail_id " +
                "left join order_coupon oc " +
                "on od.id = oc.order_detail_id " +
                "join base_dic FOR SYSTEM_TIME AS OF oi.pt dic " +
                "on od.source_type = dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);
        //tableEnv.toRetractStream(resultTable, Row.class).print(">>>>>>>>>>>");

        //TODO 9.构建DWD层订单预处理表  upsert-kafka
        tableEnv.executeSql("" +
                "create table dwd_order_pre( " +
                "    `consignee` String, " +
                "    `consignee_tel` String, " +
                "    `total_amount` String, " +
                "    `order_status` String, " +
                "    `user_id` String, " +
                "    `payment_way` String, " +
                "    `delivery_address` String, " +
                "    `order_comment` String, " +
                "    `out_trade_no` String, " +
                "    `trade_body` String, " +
                "    `create_time` String, " +
                "    `operate_time` String, " +
                "    `expire_time` String, " +
                "    `process_status` String, " +
                "    `tracking_no` String, " +
                "    `parent_order_id` String, " +
                "    `province_id` String, " +
                "    `activity_reduce_amount` String, " +
                "    `coupon_reduce_amount` String, " +
                "    `original_total_amount` String, " +
                "    `feight_fee` String, " +
                "    `feight_fee_reduce` String, " +
                "    `refundable_time` String, " +
                "    `order_detail_id` String, " +
                "    `order_id` String, " +
                "    `sku_id` String, " +
                "    `sku_name` String, " +
                "    `order_price` String, " +
                "    `sku_num` String, " +
                "    `source_type` String, " +
                "    `source_id` String, " +
                "    `split_total_amount` String, " +
                "    `split_activity_amount` String, " +
                "    `split_coupon_amount` String, " +
                "    `order_detail_activity_id` String, " +
                "    `activity_id` String, " +
                "    `activity_rule_id` String, " +
                "    `order_detail_coupon_id` String, " +
                "    `coupon_id` String, " +
                "    `coupon_use_id` String, " +
                "    `dic_name` String, " +
                "    PRIMARY KEY (order_detail_id) NOT ENFORCED  " +
                ")" + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));

        //TODO 10.将数据写出
        tableEnv.executeSql("insert into dwd_order_pre select * from result_table");

        //启动任务
        //env.execute("DwdTradeOrderPreProcess1");

    }

}
